Skip to content

Conversation

@michalnarwojsz-abax
Copy link
Contributor

@michalnarwojsz-abax michalnarwojsz-abax commented Nov 28, 2025

User description

In case Google PubSub subscription encounters an unrecoverable error, perform normal connection dropped handling: call the OnDropped callback and try to reconnect.


Auto-created Ticket

#478

PR Type

Enhancement


Description

  • Add automatic reconnection handling for Google PubSub subscription failures

  • Monitor subscriber task for unrecoverable errors and dropped connections

  • Call OnDropped callback with appropriate reason on subscription termination

  • Ensure proper cleanup of monitor task during unsubscribe


Diagram Walkthrough

flowchart LR
  A["Subscribe starts<br/>_subscriberTask"] --> B["MonitorSubscriberTask<br/>monitors task"]
  B --> C{"Task completes<br/>or fails?"}
  C -->|Normal completion| D["Dropped callback<br/>DropReason.Stopped"]
  C -->|Unrecoverable error| E["Dropped callback<br/>DropReason.ServerError"]
  C -->|Cancellation| F["Expected shutdown"]
  D --> G["Reconnection triggered"]
  E --> G
Loading

File Walkthrough

Relevant files
Enhancement
GooglePubSubSubscription.cs
Add subscriber task monitoring and error handling               

src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs

  • Added _monitorTask field to track the monitoring task lifecycle
  • Implemented MonitorSubscriberTask method to handle subscriber task
    completion and failures
  • Integrated monitoring task startup in Subscribe method after starting
    subscriber
  • Enhanced Unsubscribe method to properly await and clean up the monitor
    task
  • Handles three scenarios: normal completion, operation cancellation,
    and unrecoverable errors
+19/-0   

In case Google PubSub subscription encounters an unrecoverable
error, perform normal connection dropped handling: call the
OnDropped callback and try to reconnect.
@qodo-free-for-open-source-projects
Copy link
Contributor

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
🟢
No codebase code duplication found No new components were introduced in the PR code
Custom Compliance
🟢
Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status:
Null task handling: The _monitorTask field is initialized to null! but may not be set if Subscribe fails
before line 96, potentially causing null reference issues in Unsubscribe.

Referred Code
Task _monitorTask    = null!;

Learn more about managing compliance generic rules or creating your own custom rules

Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-free-for-open-source-projects
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Handle all cancellation scenarios correctly

Add a catch (OperationCanceledException) block to MonitorSubscriberTask to
correctly handle cancellations from sources other than the main subscription
token, preventing them from being incorrectly treated as server errors.

src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs [134-148]

 async Task MonitorSubscriberTask(Task subscriberTask, CancellationToken cancellationToken) {
     try {
         await subscriberTask.NoContext();
 
         // If the task completes without cancellation, the subscription was dropped
         if (!cancellationToken.IsCancellationRequested) {
             Dropped(DropReason.Stopped, null);
         }
     } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) {
         // Expected when shutting down
+    } catch (OperationCanceledException) {
+        // Another cancellation, which is not from the subscription token.
+        // This can happen if the handler is cancelled, and we should not treat it as an error.
     } catch (Exception ex) {
         // Subscriber task failed with an unrecoverable error
         Dropped(DropReason.ServerError, ex);
     }
 }
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a potential bug where a valid cancellation could be misinterpreted as a server error, leading to incorrect resubscription logic. The proposed fix correctly handles all cancellation scenarios, making the error handling more robust.

Medium
Avoid race conditions during unsubscription

Refactor the Unsubscribe method to use Task.WhenAll to concurrently await
_subscriberTask and _monitorTask, which simplifies the logic and improves
robustness.

src/GooglePubSub/src/Eventuous.GooglePubSub/Subscriptions/GooglePubSubSubscription.cs [150-154]

 protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) {
     if (_client != null) await _client.StopAsync(cancellationToken).NoContext();
-    await _subscriberTask.NoContext();
-    await _monitorTask.NoContext();
+    if (_subscriberTask != null! && _monitorTask != null!)
+        await Task.WhenAll(_subscriberTask, _monitorTask).NoContext();
 }
  • Apply / Chat
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that awaiting _subscriberTask and then _monitorTask is redundant since the latter awaits the former. Using Task.WhenAll is a cleaner and more robust pattern for ensuring both tasks are complete.

Low
  • More

@github-actions
Copy link

github-actions bot commented Dec 2, 2025

Test Results

 51 files  + 34   51 suites  +34   31m 53s ⏱️ + 21m 7s
282 tests + 10  282 ✅ + 10  0 💤 ±0  0 ❌ ±0 
849 runs  +566  849 ✅ +566  0 💤 ±0  0 ❌ ±0 

Results for commit 97efc6a. ± Comparison against base commit 57b538b.

This pull request removes 5 and adds 15 tests. Note that renamed tests count towards both.
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/24/2025 12:31:29 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/24/2025 12:31:29 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(732a5bc5-3f5e-42d5-a5c7-277ff0ee25a2)
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2025-11-24T12:31:28.8371614+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-24T12:31:28.8371614+00:00 }, CommitPosition { Position: 0, Sequence: 4, Timestamp: 2025-11-24T12:31:28.8371614+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2025-11-24T12:31:28.8371614+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-24T12:31:28.8371614+00:00 })
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2025-11-24T12:31:28.8371614+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-24T12:31:28.8371614+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2025-11-24T12:31:28.8371614+00:00 }, CommitPosition { Position: 0, Sequence: 8, Timestamp: 2025-11-24T12:31:28.8371614+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-24T12:31:28.8371614+00:00 })
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(12/2/2025 8:34:08 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(12/2/2025 8:34:08 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(12/2/2025 8:34:09 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(12/2/2025 8:34:09 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(12/2/2025 8:34:11 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(12/2/2025 8:34:11 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(5253c220-5616-4f07-a13f-9942b12c8a59)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(b91e96c4-dd87-49a4-9fa0-5d8b5c5af4cf)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(c7ee93d7-9ea8-4300-8139-234a4309ae8d)
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2025-12-02T20:34:07.9518295+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-12-02T20:34:07.9518295+00:00 }, CommitPosition { Position: 0, Sequence: 4, Timestamp: 2025-12-02T20:34:07.9518295+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2025-12-02T20:34:07.9518295+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-12-02T20:34:07.9518295+00:00 })
…

@alexeyzimarev alexeyzimarev merged commit 2f9d974 into Eventuous:dev Dec 3, 2025
5 checks passed
@michalnarwojsz-abax
Copy link
Contributor Author

Thank you for merging! Could some pre-release NuGet version be published with the fix included, so that we can reference it in our project?

@alexeyzimarev
Copy link
Contributor

All merged code is published as packages. It's on myget. Check the readme, it should be there

@michalnarwojsz-abax
Copy link
Contributor Author

All merged code is published as packages. It's on myget. Check the readme, it should be there

Perfect, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants